package com.pai4j.domain.vo.llm;
import com.google.gson.Gson;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import okhttp3.*;
import okio.BufferedSource;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * @author: CYM-pai
 * @date: 2025/05/05 15:05
 **/
public class DeepSeekAIClient {

    private static final String DEFAULT_BASE_URL = "https://api.deepseek.com";

    private static final String CHAT_COMPLETION_SUFFIX = "/chat/completions";
    private static final String MODELS_SUFFIX = "/models";
    private static final String FILES_SUFFIX = "/files";

    private String baseUrl;
    private String apiKey;

    public DeepSeekAIClient(String apiKey) {
        this(apiKey, DEFAULT_BASE_URL);
    }

    public DeepSeekAIClient(String apiKey, String baseUrl) {
        this.apiKey = apiKey;
        if (baseUrl.endsWith("/")) {
            baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
        }
        this.baseUrl = baseUrl;
    }

    public String getChatCompletionUrl() {
        return baseUrl + CHAT_COMPLETION_SUFFIX;
    }

    public String getModelsUrl() {
        return baseUrl + MODELS_SUFFIX;
    }

    public String getFilesUrl() {
        return baseUrl + FILES_SUFFIX;
    }

    public String getApiKey() {
        return apiKey;
    }

    public ModelsList ListModels() throws IOException {
        OkHttpClient client = new OkHttpClient();
        okhttp3.Request request = new okhttp3.Request.Builder()
                .url(getModelsUrl())
                .addHeader("Authorization", "Bearer " + getApiKey())
                .build();
        try {
            okhttp3.Response response = client.newCall(request).execute();
            String body = response.body().string();
            Gson gson = new Gson();
            return gson.fromJson(body, ModelsList.class);
        } catch (IOException e) {
            e.printStackTrace();
            throw e;
        }
    }


    public ChatCompletionResponse ChatCompletion(ChatCompletionRequest request) throws IOException {
        request.stream = false;
        OkHttpClient client = new OkHttpClient.Builder()
                .connectTimeout(2, TimeUnit.MINUTES)
                .readTimeout(2, TimeUnit.MINUTES)
                .writeTimeout(2, TimeUnit.MINUTES)
                .build();
        okhttp3.MediaType mediaType = okhttp3.MediaType.parse("application/json");
        okhttp3.RequestBody body = okhttp3.RequestBody.create(mediaType, new Gson().toJson(request));
        okhttp3.Request httpRequest = new okhttp3.Request.Builder()
                .url(getChatCompletionUrl())
                .addHeader("Authorization", "Bearer " + getApiKey())
                .addHeader("Content-Type", "application/json")
                .post(body)
                .build();
        try {
            okhttp3.Response response = client.newCall(httpRequest).execute();
            String responseBody = response.body().string();
            System.out.println(responseBody);
            Gson gson = new Gson();
            return gson.fromJson(responseBody, ChatCompletionResponse.class);
        } catch (IOException e) {
            e.printStackTrace();
            throw e;
        }
    }

    // return a stream of ChatCompletionStreamResponse
    public Flowable<ChatCompletionStreamResponse> ChatCompletionStream(ChatCompletionRequest request) throws IOException {
        request.stream = true;
        OkHttpClient client = new OkHttpClient();
        okhttp3.MediaType mediaType = okhttp3.MediaType.parse("application/json");
        okhttp3.RequestBody body = okhttp3.RequestBody.create(mediaType, new Gson().toJson(request));
        okhttp3.Request httpRequest = new okhttp3.Request.Builder()
                .url(getChatCompletionUrl())
                .addHeader("Authorization", "Bearer " + getApiKey())
                .addHeader("Content-Type", "application/json")
                .post(body)
                .build();
        okhttp3.Response response = client.newCall(httpRequest).execute();
        if (response.code() != 200) {
            throw new RuntimeException("Failed to start stream: " + response.body().string());
        }

        // get response body line by line
        return Flowable.create(emitter -> {
            okhttp3.ResponseBody responseBody = response.body();
            if (responseBody == null) {
                emitter.onError(new RuntimeException("Response body is null"));
                return;
            }
            String line;
            while ((line = responseBody.source().readUtf8Line()) != null) {
                if (line.startsWith("data:")) {
                    line = line.substring(5);
                    line = line.trim();
                }
                if (Objects.equals(line, "[DONE]")) {
                    emitter.onComplete();
                    return;
                }
                line = line.trim();
                if (line.isEmpty()) {
                    continue;
                }
                Gson gson = new Gson();
                ChatCompletionStreamResponse streamResponse = gson.fromJson(line, ChatCompletionStreamResponse.class);
                emitter.onNext(streamResponse);
            }
            emitter.onComplete();
        }, BackpressureStrategy.BUFFER);
    }
    
    
    public Flux<String> ChatCompletionStreamV3(ChatCompletionRequest request) throws IOException {
        request.stream = true;
        RequestBody body = RequestBody.create(
                MediaType.parse("application/json"),
                new Gson().toJson(request)
        );
        
        Request httpRequest = new Request.Builder()
                .url(getChatCompletionUrl())
                .addHeader("Authorization", "Bearer " + getApiKey())
                .addHeader("Accept", "text/event-stream")
                .post(body)
                .build();
        
        // 使用动态背压策略：初始缓冲，后期根据处理速度调整
        return Flux.create(emitter -> {
            try (Response response = SHARED_HTTP_CLIENT.newCall(httpRequest).execute()) {
                emitter.onDispose(() -> {
                    if (response != null) response.close();
                });
                
                if (!response.isSuccessful()) {
                    emitter.error(new IOException("Unexpected response code: " + response.code()));
                    return;
                }
                
                try (BufferedSource source = response.body().source()) {
                    while (!emitter.isCancelled()) {
                        String line = source.readUtf8Line();
                        if (line == null) break;
                        
                        if (line.startsWith("data:")) {
                            line = line.substring(5).trim();
                            if (line.equals("[DONE]")) {
                                emitter.complete();
                                break;
                            }
                            
                            if (!line.isEmpty()) {
                                try {
                                    ChatCompletionStreamResponse chunk = new Gson().fromJson(
                                            line, ChatCompletionStreamResponse.class);
                                    String content = chunk.getChoices().get(0).getDelta().getContent();
                                    if (content != null) {
                                        // 实时输出到控制台（可选）
                                        System.out.println(content);
                                        // 流式发送到订阅者
                                        emitter.next(content);
                                    }
                                } catch (Exception e) {
                                    emitter.error(e);
                                    break;
                                }
                            }
                        }
                    }
                }
            } catch (Exception e) {
                emitter.error(e);
            }
        }, FluxSink.OverflowStrategy.DROP); // 改用DROP策略避免内存堆积
    }
    
}
